package com.comleader.baseproject.common.kafka;

import cn.hutool.core.collection.CollectionUtil;
import lombok.SneakyThrows;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.validation.annotation.Validated;

import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

/**
 * @Description:
 * @Auther: 张鹏飞(1764)
 * @Date: 2020/8/19 11:21
 * @Version: 1.0.0
 */
@Component
@Validated
public class KafkaCommon {
    private Logger logger = LogManager.getLogger(KafkaCommon.class);
    /**
     * kafka 消息序列号  自增
     */
    private volatile static int kafkaMsgIndex = 1;


    @Resource
    private KafkaTemplate<String,String> kafkaTemplate;

    /**
     * kafka同步发送msg消息
     * @param msg
     */
    @SneakyThrows
    public Object doSendSyn(String topic, String msg){
        ListenableFuture<SendResult<String, String>> future  = null;
        try {
            logger.info("Send kafka msg result：topic:{},msg:{}",topic,msg);
            future = this.kafkaTemplate.send(topic,String.valueOf(kafkaMsgIndex-1),  msg);
            /**
             * 10秒未响应返回失败
             */
            return future.get(10, TimeUnit.SECONDS).getRecordMetadata();
        }catch (InterruptedException|ExecutionException e){
            throw new RuntimeException(e);
        }catch (Exception e){
            throw new RuntimeException(e);
        }
        // return null;
    }

    /**
     * kafka异步发送发送msg消息
     * @param msg
     */
    @SneakyThrows
    public ListenableFuture<SendResult<String, String>> doSendAsyn(String topic, String msg){
        ListenableFuture<SendResult<String, String>> future  = null;
        try {
            logger.info("Send kafka msg result：topic:{},msg:{}",topic,msg);
            future = this.kafkaTemplate.send(topic,String.valueOf(kafkaMsgIndex-1),  msg);
            /**
             * 10秒未响应返回失败
             */
            return future;
        }catch (Exception e){
            throw new RuntimeException(e);
        }
        // return null;
    }

    /**
     * kafka批量发送msg消息,返回信息是错误的消息信息
     * @param msgs
     */
    @SneakyThrows
    public List<ProducerRecord<String, String>> doBatchSend(String topic, Collection<String> msgs){
        if (CollectionUtil.isEmpty(msgs)){
            return null;
        }
        List<ListenableFuture<SendResult<String, String>>> futures = new ArrayList<>();
        for (String msg : msgs) {
            ListenableFuture<SendResult<String, String>> future = this.kafkaTemplate.send(topic,String.valueOf(kafkaMsgIndex-1),  msg);
            futures.add(future);
        }
        List<ProducerRecord<String, String>> errorList = new ArrayList<>();
        for (ListenableFuture<SendResult<String, String>> sendResultListenableFuture : futures) {
            ProducerRecord<String, String> producerRecord = null;
            try {
                // 10s未响应超时
                producerRecord = sendResultListenableFuture.get(10, TimeUnit.SECONDS).getProducerRecord();
                if (producerRecord == null){
                    errorList.add(producerRecord);
                }
            } catch (Exception e) {
                errorList.add(producerRecord);
                logger.error(e);
            }
        }
        return errorList;
    }

}
